[HUDI-3475] Initialize hudi table management module#5926
[HUDI-3475] Initialize hudi table management module#5926yuzhaojing wants to merge 1 commit intoapache:masterfrom
Conversation
|
@yuzhaojing - I have given @prasannarajaperumal full context on this. and he will take over this. and I ll check-in/chime in as well as needed |
Ok, will communicate with him. |
|
@prasannarajaperumal Can you take a look for this pr? |
prasannarajaperumal
left a comment
There was a problem hiding this comment.
On a more general note - how are these API calls triggered? TableManagementService would ideally monitor all the Hudi datasets and keep track of the various stats and have configurations to trigger each of these table management actions. Are you planning on doing this as a follow up?
|
|
||
| package org.apache.hudi.table.management.entity; | ||
|
|
||
| public enum InstanceStatus { |
There was a problem hiding this comment.
May be we can split State (SCHEDULED, RUNNING, COMPLETED) and Status (SUCESS, FAIL). Completed state can either be a success or fail.
| break; | ||
| case FLINK: | ||
| default: | ||
| throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine()); |
There was a problem hiding this comment.
Engine {0} not supported
|
|
||
| @Override | ||
| public Map<String, String> getJobParams(Instance instance) { | ||
| Map<String, String> sparkParams = new HashMap<>(); |
There was a problem hiding this comment.
We could possibly register a execution engine and config with a seperate API endpoint and use that engineConfig as an input to run a table service. ServiceConfig is not generalized to any engine today - would rather get this as a input through the rest API.
| @ToString | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class Instance { |
There was a problem hiding this comment.
Better name for this would TableManagementAction? Also rename Action as ActionType
|
|
||
| import java.util.List; | ||
|
|
||
| public class RelationDBBasedStore implements MetadataStore { |
There was a problem hiding this comment.
Ideally we want this to be built over the HudiTimeline persistence of RFC-36 (Metastore server). These 2 RFC's have persistence layer shared. I am okay to track this as an action item and proceed here.
| <version>1.7.25</version> | ||
| </dependency> | ||
|
|
||
| <dependency> |
There was a problem hiding this comment.
Can you check on the license of this third-party? Seems like apache 2.0 (https://github.com/brettwooldridge/HikariCP/blob/dev/LICENSE) but want to make sure.
| @ToString | ||
| @NoArgsConstructor | ||
| @AllArgsConstructor | ||
| public class Instance { |
There was a problem hiding this comment.
JavaDoc and comments on the fields used here will be helpful.
4fbf616 to
9c63087
Compare
xushiyan
left a comment
There was a problem hiding this comment.
roughly went through the code base and commented about some improvements. will continue review tmr
| import java.util.Calendar; | ||
| import java.util.Date; | ||
|
|
||
| public class DateTimeUtils { |
There was a problem hiding this comment.
can we consolidate this with org.apache.hudi.common.util.DateTimeUtils ? and pls add some UTs
| import java.sql.PreparedStatement; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class SqlSessionFactoryUtil { |
There was a problem hiding this comment.
can we consolidate this with org.apache.hudi.metaserver.store.jdbc.SqlSessionFactoryUtils ? and more UTs?
There was a problem hiding this comment.
In the follow-up PR, hudi-platform-common will be extracted for unification, let us follow up.
| } | ||
|
|
||
| public static void main(String[] args) throws Exception { | ||
| System.out.println("SPARK_HOME = " + System.getenv("SPARK_HOME")); |
There was a problem hiding this comment.
can you review all occurrence of system.out/err.print and replace with logger ?
| @Override | ||
| public void handle(@NotNull Context context) throws Exception { | ||
| boolean success = true; | ||
| long beginTs = System.currentTimeMillis(); |
There was a problem hiding this comment.
use org.apache.hudi.common.util.HoodieTimer as a standard for compute code execution time
| import org.apache.ibatis.session.RowBounds; | ||
| import org.apache.ibatis.session.SqlSession; | ||
|
|
||
| public class JdbcMapper { |
There was a problem hiding this comment.
we should standardize jdbc interaction, at least for metaserver and TSM. We can have a follow up jira for this - having a module hudi-platform-service/hudi-platform-common for common components & classes maybe ?
| public void updateExecutionInfo(Instance instance) { | ||
| int retryNum = 0; | ||
| try { | ||
| while (retryNum++ < 3) { |
|
|
||
| void init(); | ||
|
|
||
| void startService(); |
There was a problem hiding this comment.
why not just call it start() ? . :)
|
|
||
| import java.util.Map; | ||
|
|
||
| public abstract class ExecutionEngine { |
There was a problem hiding this comment.
can you please review all new classes for TSM models and add javadoc to explain the use case ?
There was a problem hiding this comment.
Sure, will add doc.
|
|
||
| import lombok.Getter; | ||
|
|
||
| import java.util.Date; |
There was a problem hiding this comment.
pls avoid all java.util.Date, which is not thread safe. let's change all to java.time.* instead. Also for timestamp, can you see if better with all Long type?
| @Parameter(names = {"-instance-submit-timeout-sec"}, description = "Instance Submit Timeout Sec") | ||
| public Integer instanceSubmitTimeoutSec = 600; | ||
|
|
||
| @Parameter(names = {"-spark-submit-jar-path"}, description = "Spark Submit Jar Path") |
There was a problem hiding this comment.
this is spark-specific? it's under common/CommandConfig so it better be engine agnostic.
| <result column="update_time" property="updateTime" javaType="java.util.Date"/> | ||
| </resultMap> | ||
|
|
||
| <update id="createInstance"> |
There was a problem hiding this comment.
It might be misunderstood with saveInstance below, maybe we can call it createInstanceTable?
| @Parameter(names = {"-instance-submit-timeout-sec"}, description = "Instance Submit Timeout Sec") | ||
| public Integer instanceSubmitTimeoutSec = 600; | ||
|
|
||
| @Parameter(names = {"-spark-submit-jar-path"}, description = "Spark Submit Jar Path") |
There was a problem hiding this comment.
Does spark-submit-jar refer to hudi-cli? Its parameters seem to be
sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(),
instance.getTableName(), instance.getInstant(), parallelism, "", maxRetryNum, "")| sparkLauncher.setConf(entry.getKey(), entry.getValue()); | ||
| } | ||
|
|
||
| sparkLauncher.addSparkArg("--queue", instance.getQueue()); |
There was a problem hiding this comment.
Maybe we can use spark.yarn.queue and put it in the jobParams above?
| // sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master, sparkMemory, client.getBasePath(), | ||
| // client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, | ||
| // retry, propsFilePath); | ||
| sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(), |
There was a problem hiding this comment.
COMPACT_RUN might be set as a parameter
|
|
||
| private int maxRetry; | ||
|
|
||
| private Date queryStartTime = DateTimeUtils.addDay(-3); |
| throw new RuntimeException("Instance not exist: " + instance); | ||
| } | ||
| // 2. update status | ||
| metadataStore.updateStatus(instance); |
There was a problem hiding this comment.
It feels like no changes have been made here
| <resultMap type="org.apache.hudi.table.service.manager.entity.Instance" id="InstanceMapping"> | ||
| <result column="id" property="id" javaType="java.lang.Long"/> | ||
| <result column="db_name" property="dbName"/> | ||
| <result column="table_name" property="tableName"/> |
There was a problem hiding this comment.
Is the tbl table canceled? I noticed that tbl_id is used here in rfc
prep step #6732